iT邦幫忙

2024 iThome 鐵人賽

DAY 0
0
自我挑戰組

重新開始 elasticsearch 系列 第 28

2024 鐵人賽 Day29: Ingest Pipeline - Tweeter Data

  • 分享至 

  • xImage
  •  

今天要來用 ES 的 Ingest Pipeline 處理前面用來做 Auto complete 的 Tweeter 資料,先回顧一下:

  • 資料的樣子

    {'name': 'MaggieBreathnac',
     'user_id': 487990281,
     'tweet': 'Making memories #nanny #anpost #isolation #happy #corona @ An Rinn '
              'https://t.co/byA9uSVxFc',
     'tweet_id': 1244969855058677766,
     'retweets': 0,
     'favorites': 0,
     'created': '31-Mar-2020',
     'followers': 522,
     'is_user_verified': False,
     'geo': {'type': 'Point', 'coordinates': [52.04681279, -7.56678938]},
     'coordinates': {'type': 'Point', 'coordinates': [-7.56678938, 52.04681279]},
     'location': 'dublin',
     'primary_location': {'type': 'Point',
                          'coordinates': [-7.56678938, 52.04681279]}}
    
  • 希望他變成的樣子

    {
    		"name": 'MaggieBreathnac',
    		"user_id": 487990281,
    		"tweet": 'Making memories #nanny #anpost #isolation #happy #corona @ An Rinn '
              'https://t.co/byA9uSVxFc',
    		"tweet_id": 1244969855058677766,
    		"retweets": 0,
    		"favorites": 0,
    		"created": '31-Mar-2020',
    		"followers": 522,
    		"is_user_verified": False,
    		"geo": [-7.56678938, 52.04681279],
    		"location": 'dublin'
    }
    
  • python 做了什麼處理

    def data_to_es(json_file: str, index_name: str):
        with open(json_file, 'r') as f:
            data = json.load(f)
        fields_to_rm = ['primary_location', 'coordinates']
        for d in data:
            for f in fields_to_rm:
                if d.get(f):
                    d.pop(f)
            if d.get('geo'):
                d['geo'] = d['geo']['coordinates'].reverse()
            if d.get('created'):
                d['created'] = datetime.strptime(d['created'], "%d-%b-%Y")
            d['_index'] = index_name
            yield d
    
    1. 移除 primary_locationcoordinates 這兩個值
    2. 轉換 geo.coordinates 內的值並寫為 geo 欄位
    3. 解析 created 欄位為日期格式

這些轉換都蠻單純的,符合使用 ES ingest pipeline 的情境,以下是三個步驟的 Processor:

  1. 移除 primary_locationcoordinates 這兩個值

    
      {
        "remove": {
          "field": [
            "primary_location",
            "coordinates"
          ]
        }
      }
    
  2. 轉換 geo.coordinates 內的值並寫為 geo 欄位

    1. 擷取 geo.coordinates fields 為 geo field
    
      {
        "json": {
          "field": "geo.coordinates",
          "target_field": "geo"
        }
      }
    

    b. 將 geo 欄位內的經緯度反過來

      {
        "script": {
          "source": "ArrayList tmp = ctx[\"geo\"]; Collections.reverse(tmp); //ctx[\"geo\"]=[ctx[\"geo\"][1], ctx[\"geo\"][0]] ctx[\"geo\"] = tmp; "
        }
      }
    
  3. 解析 created 欄位為日期欄位

    
      {
        "date": {
          "field": "created",
          "formats": [
            "dd-MMM-yyyy"
          ],
          "target_field": "created"
        }
      }
    
    

這樣我們就完成了一條資料管線了,其中比較麻煩的是要的經緯度 arrayList 反序的 processor,需要理解 painless pipeline,但比起要為了這一條管線開一個環境安裝 python、維運 python script 單純多了。

這系列文章也終於告一個段落了,下一篇會來個總回顧~


上一篇
2024 鐵人賽 Day28: Ingest Pipeline
下一篇
2024 鐵人賽 Day30: I Made It
系列文
重新開始 elasticsearch 29
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言